/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.broker.topic;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
import com.alibaba.rocketmq.common.ConfigManager;
import com.alibaba.rocketmq.common.DataVersion;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.protocol.body.KVTable;
import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;

/**
 * Topic配置管理
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @author lansheng.zj@taobao.com
 * @since 2013-7-26
 */
public class TopicConfigManager extends ConfigManager {
	private static final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName);
	private static final long LockTimeoutMillis = 3000;
	private transient final Lock lockTopicConfigTable = new ReentrantLock();
	private transient BrokerController brokerController;

	// Topic配置
	private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(1024);
	private final DataVersion dataVersion = new DataVersion();

	private final Set<String> systemTopicList = new HashSet<String>();

	public TopicConfigManager() {
	}

	public TopicConfigManager(BrokerController brokerController) {
		this.brokerController = brokerController;
		{
			// MixAll.SELF_TEST_TOPIC
			String topic = MixAll.SELF_TEST_TOPIC;
			TopicConfig topicConfig = new TopicConfig(topic);
			this.systemTopicList.add(topic);
			topicConfig.setReadQueueNums(1);
			topicConfig.setWriteQueueNums(1);
			this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
		}
		{
			// MixAll.DEFAULT_TOPIC
			if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
				String topic = MixAll.DEFAULT_TOPIC;
				TopicConfig topicConfig = new TopicConfig(topic);
				this.systemTopicList.add(topic);
				topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());
				topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());
				int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
				topicConfig.setPerm(perm);
				this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
			}
		}
		{
			// MixAll.BENCHMARK_TOPIC
			String topic = MixAll.BENCHMARK_TOPIC;
			TopicConfig topicConfig = new TopicConfig(topic);
			this.systemTopicList.add(topic);
			topicConfig.setReadQueueNums(1024);
			topicConfig.setWriteQueueNums(1024);
			this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
		}
		{
			// 集群名字
			String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
			TopicConfig topicConfig = new TopicConfig(topic);
			this.systemTopicList.add(topic);
			int perm = PermName.PERM_INHERIT;
			if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
				perm |= PermName.PERM_READ | PermName.PERM_WRITE;
			}
			topicConfig.setPerm(perm);
			this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
		}
		{
			// 服务器名字
			String topic = this.brokerController.getBrokerConfig().getBrokerName();
			TopicConfig topicConfig = new TopicConfig(topic);
			this.systemTopicList.add(topic);
			int perm = PermName.PERM_INHERIT;
			if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
				perm |= PermName.PERM_READ | PermName.PERM_WRITE;
			}
			topicConfig.setReadQueueNums(1);
			topicConfig.setWriteQueueNums(1);
			topicConfig.setPerm(perm);
			this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
		}
		{
			// MixAll.OFFSET_MOVED_EVENT
			String topic = MixAll.OFFSET_MOVED_EVENT;
			TopicConfig topicConfig = new TopicConfig(topic);
			this.systemTopicList.add(topic);
			topicConfig.setReadQueueNums(1);
			topicConfig.setWriteQueueNums(1);
			this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
		}
	}

	public boolean isSystemTopic(final String topic) {
		return this.systemTopicList.contains(topic);
	}

	public Set<String> getSystemTopic() {
		return this.systemTopicList;
	}

	public boolean isTopicCanSendMessage(final String topic) {
		boolean reservedWords = topic.equals(MixAll.DEFAULT_TOPIC) || topic.equals(this.brokerController.getBrokerConfig().getBrokerClusterName());

		return !reservedWords;
	}

	public TopicConfig selectTopicConfig(final String topic) {
		return this.topicConfigTable.get(topic);
	}

	/**
	 * ==========================收到发送消息时创建topic的配置信息===========================
	 * 发消息时，如果Topic不存在，尝试创建
	 */
	public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
		TopicConfig topicConfig = null;
		boolean createNew = false;

		try {
			if (this.lockTopicConfigTable.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					// 以参数topic从TopicConfigManager.topicConfigTable中获取该topic的配置信息对象TopicConfig
					topicConfig = this.topicConfigTable.get(topic);
					if (topicConfig != null)
						// 若获取到了则直接返回该对象
						return topicConfig;

					// defaultTopic该参数在Producer发送信息时默认为"TBW102",而该topic在Broker启动时会自动创建
					TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
					if (defaultTopicConfig != null) {
						if (PermName.isInherited(defaultTopicConfig.getPerm())) {
							topicConfig = new TopicConfig(topic);

							int queueNums = clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig.getWriteQueueNums() : clientDefaultTopicQueueNums;

							if (queueNums < 0) {
								queueNums = 0;
							}

							topicConfig.setReadQueueNums(queueNums);
							topicConfig.setWriteQueueNums(queueNums);
							int perm = defaultTopicConfig.getPerm();
							perm &= ~PermName.PERM_INHERIT;
							topicConfig.setPerm(perm);
							topicConfig.setTopicSysFlag(topicSysFlag);
							topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
						} else {
							log.warn("create new topic failed, because the default topic[" + defaultTopic + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " + remoteAddress);
						}
					} else {
						log.warn("create new topic failed, because the default topic[" + defaultTopic + "] not exist." + " producer: " + remoteAddress);
					}

					if (topicConfig != null) {
						log.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig + " producer: " + remoteAddress);
						// 将新创建的TopicConfig对象以topic为key值存入TopicConfigManager.topicConfigTable变量中
						this.topicConfigTable.put(topic, topicConfig);
						// 更新dataVersion
						this.dataVersion.nextVersion();

						createNew = true;
						// 将topicConfigTable变量值持久化到topics.json物理文件中。
						this.persist();
					}
				} finally {
					this.lockTopicConfigTable.unlock();
				}
			}
		} catch (InterruptedException e) {
			log.error("createTopicInSendMessageMethod exception", e);
		}

		if (createNew) {
			this.brokerController.registerBrokerAll(false, true);
		}

		return topicConfig;
	}

	/**
	 * ================================收到消费失败时的回传消息时创建topic的配置信息=============================
	 * @param topic
	 * @param clientDefaultTopicQueueNums
	 * @param perm
	 * @param topicSysFlag
	 * @return
	 */
	public TopicConfig createTopicInSendMessageBackMethod(//
			final String topic, //
			final int clientDefaultTopicQueueNums, //
			final int perm, //
			final int topicSysFlag) {
		TopicConfig topicConfig = this.topicConfigTable.get(topic);
		if (topicConfig != null)
			return topicConfig;

		boolean createNew = false;

		try {
			if (this.lockTopicConfigTable.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					topicConfig = this.topicConfigTable.get(topic);
					if (topicConfig != null)
						return topicConfig;

					topicConfig = new TopicConfig(topic);
					topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
					topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
					topicConfig.setPerm(perm);
					topicConfig.setTopicSysFlag(topicSysFlag);

					log.info("create new topic {}", topicConfig);
					this.topicConfigTable.put(topic, topicConfig);
					createNew = true;
					this.dataVersion.nextVersion();
					// 将topicConfigTable变量值持久化到topics.json物理文件中
					this.persist();
				} finally {
					this.lockTopicConfigTable.unlock();
				}
			}
		} catch (InterruptedException e) {
			log.error("createTopicInSendMessageBackMethod exception", e);
		}

		if (createNew) {
			this.brokerController.registerBrokerAll(false, true);
		}

		return topicConfig;
	}

	/**
	 * 更新 topic 的单元化标识
	 */
	public void updateTopicUnitFlag(final String topic, final boolean unit) {

		TopicConfig topicConfig = this.topicConfigTable.get(topic);
		if (topicConfig != null) {
			int oldTopicSysFlag = topicConfig.getTopicSysFlag();
			if (unit) {
				topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
			} else {
				topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
			}

			log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, topicConfig.getTopicSysFlag());

			this.topicConfigTable.put(topic, topicConfig);

			this.dataVersion.nextVersion();

			this.persist();
			this.brokerController.registerBrokerAll(false, true);
		}
	}

	/**
	 * 更新 topic 是否有单元化订阅组
	 */
	public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
		TopicConfig topicConfig = this.topicConfigTable.get(topic);
		if (topicConfig != null) {
			int oldTopicSysFlag = topicConfig.getTopicSysFlag();
			if (hasUnitSub) {
				topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
			}

			log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, topicConfig.getTopicSysFlag());

			this.topicConfigTable.put(topic, topicConfig);

			this.dataVersion.nextVersion();

			this.persist();
			this.brokerController.registerBrokerAll(false, true);
		}
	}

	public void updateTopicConfig(final TopicConfig topicConfig) {
		/**
		 * 以topic为key值更新TopicConfigManager.topicConfigTable: ConcurrentHashMap<String, TopicConfig>变量中的TopicConfig对象
		 */
		TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
		if (old != null) {
			log.info("update topic config, old: " + old + " new: " + topicConfig);
		} else {
			log.info("create new topic, " + topicConfig);
		}
		// 更新dataVersion值（即版本号）
		this.dataVersion.nextVersion();
		// 将TopicConfigManager.topicConfigTable的值持久化到topics.json文件中。
		this.persist();
	}

	public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
		// 根据 nameserver 上的 topic 配置同步检查更新 topic config 的顺序消息配置
		if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
			boolean isChange = false;
			Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
			for (String topic : orderTopics) {
				TopicConfig topicConfig = this.topicConfigTable.get(topic);
				if (topicConfig != null && !topicConfig.isOrder()) {
					topicConfig.setOrder(true);
					isChange = true;
					log.info("update order topic config, topic={}, order={}", topic, true);
				}
			}
			for (String topic : this.topicConfigTable.keySet()) {
				if (!orderTopics.contains(topic)) {
					TopicConfig topicConfig = this.topicConfigTable.get(topic);
					if (topicConfig.isOrder()) {
						topicConfig.setOrder(false);
						isChange = true;
						log.info("update order topic config, topic={}, order={}", topic, false);
					}
				}
			}
			if (isChange) {
				this.dataVersion.nextVersion();
				this.persist();
			}
		}
	}

	public boolean isOrderTopic(final String topic) {
		TopicConfig topicConfig = this.topicConfigTable.get(topic);
		if (topicConfig == null) {
			return false;
		} else {
			return topicConfig.isOrder();
		}
	}

	public void deleteTopicConfig(final String topic) {
		TopicConfig old = this.topicConfigTable.remove(topic);
		if (old != null) {
			log.info("delete topic config OK, topic: " + old);
			this.dataVersion.nextVersion();
			this.persist();
		} else {
			log.warn("delete topic config failed, topic: " + topic + " not exist");
		}
	}

	public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
		TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
		topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
		topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
		return topicConfigSerializeWrapper;
	}

	@Override
	public String encode() {
		return encode(false);
	}

	public String encode(final boolean prettyFormat) {
		TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
		topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
		topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
		return topicConfigSerializeWrapper.toJson(prettyFormat);
	}

	@Override
	public void decode(String jsonString) {
		if (jsonString != null) {
			TopicConfigSerializeWrapper topicConfigSerializeWrapper = TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
			if (topicConfigSerializeWrapper != null) {
				this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
				this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
				this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
			}
		}
	}

	private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
		Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
		while (it.hasNext()) {
			Entry<String, TopicConfig> next = it.next();
			log.info("load exist local topic, {}", next.getValue().toString());
		}
	}

	@Override
	public String configFilePath() {
		return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
	}

	public DataVersion getDataVersion() {
		return dataVersion;
	}

	public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
		return topicConfigTable;
	}
}
